Feature: Incremental Append Scan#3364
Conversation
f86284f to
1fd9274
Compare
| class BaseScan(ABC): | ||
| """A base class for all table scans.""" | ||
|
|
||
| class TableScan(ABC): |
There was a problem hiding this comment.
This isn't a rename or removal (the diff is misleading) - TableScan is just moved below the new BaseScan class
| and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number | ||
| ) | ||
| @property | ||
| def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: |
There was a problem hiding this comment.
This is a public property, so keeping it for back-compat
|
|
||
|
|
||
| class DataScan(TableScan): | ||
| def _build_partition_projection(self, spec_id: int) -> BooleanExpression: |
There was a problem hiding this comment.
This is now moved into ManifestGroupPlanner so it can be shared with DataScan and IncrementalAppendScan.
| def plan_files( | ||
| self, | ||
| manifests: Iterable[ManifestFile], | ||
| manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, |
There was a problem hiding this comment.
This manifest filter is new. Introducing that for append scan logic where some manifests are skipped
| table_identifier=self._identifier, | ||
| ) | ||
|
|
||
| def incremental_append_scan( |
There was a problem hiding this comment.
New convenience method mirroring Table.scan (naming thought). Args mirror scan minus snapshot_id plus the two snapshot-range args.
| to return in the output dataframe. | ||
| case_sensitive: | ||
| If True column matching is case sensitive. | ||
| from_snapshot_id_exclusive: |
There was a problem hiding this comment.
Requiring from_snapshot_id_exclusive to be non-None at plan time is a deliberate divergence from Java's IncrementalScan semantics (where the start defaults to the oldest ancestor of the end snapshot when not configured). Follows Spark's required start-snapshot-id (docs). Argument here — TL;DR an append scan only reads append snapshots, so "from the oldest ancestor" would be misleading after a replace.
| ) -> DataScan: | ||
| raise ValueError("Cannot scan a staged table") | ||
|
|
||
| def incremental_append_scan( |
There was a problem hiding this comment.
Mirrors StagedTable.scan two lines up — staged tables have no committed metadata to scan against.
| A = TypeVar("A", bound="BaseScan", covariant=True) | ||
|
|
||
|
|
||
| class BaseScan(ABC): |
There was a problem hiding this comment.
BaseScan is new; TableScan is unchanged in surface but now subclasses it. Why split:
- This PR keeps
snapshot_id,catalog,table_identifier,use_ref,snapshot(), and abstractcount()onTableScanto avoid the breaking change #533 introduced when it dropped these. - That makes
TableScansnapshot-specific, so it isn't a sensible base class for incremental scans (which have two snapshot IDs, not one). BaseScantherefore holds the genuinely-shared surface (row filter, projection, options, limit, chaining helpers, format-converter sinks built onto_arrow()).
I don't love this — if breaking TableScan were acceptable we could collapse the hierarchy like #533. See prior thinking and follow-up.
There was a problem hiding this comment.
Also pointing out: I could've avoided changing existing code entirely and having a completely independent class for append scans with duplicated manifest planning logic. I felt as though:
- the hierarchy with
TableScanandDataScan(prior to this PR) would then feel odd with a fully independentIncrementalAppendScan - duplicated code is code smell, so I've gone with a refactor here. To note that it's largely just moving code around than anything else! Let me know what folks think or design suggestions here, very open to changes
(I realise this makes the diff here scary 😄 )
| @abstractmethod | ||
| def plan_files(self) -> Iterable[ScanTask]: ... | ||
|
|
||
| def to_arrow(self) -> pa.Table: |
There was a problem hiding this comment.
Materialization stays abstract on BaseScan. Both DataScan and IncrementalAppendScan implement to_arrow / to_arrow_batch_reader as one-line delegations to the module-level helpers _to_arrow_via_file_scan_tasks / _to_arrow_batch_reader_via_file_scan_tasks above.
A BaseScan-level default would require Iterable[FileScanTask], but BaseScan.plan_files() returns Iterable[ScanTask] — Liskov-widened so that future non-file scans (e.g. changelog) can return a different task type. Mypy arg-type makes the default-on-base form impossible without specialising the base. Helpers keep the dedup without that constraint.
to_pandas / to_polars / to_duckdb / to_ray do get pulled up to BaseScan as defaults — they only need to_arrow() on self, no FileScanTask typing. Prior thinking.
| def with_case_sensitive(self: A, case_sensitive: bool = True) -> A: | ||
| return self.update(case_sensitive=case_sensitive) | ||
|
|
||
| def to_pandas(self, **kwargs: Any) -> pd.DataFrame: |
There was a problem hiding this comment.
to_pandas / to_polars were previously abstract on TableScan. They now have default implementations on BaseScan (built on to_arrow()). Prior thinking.
| """ | ||
| return self.to_arrow().to_pandas(**kwargs) | ||
|
|
||
| def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None = None) -> DuckDBPyConnection: |
There was a problem hiding this comment.
to_duckdb and to_ray were previously only on DataScan, not even on TableScan. Pulling them up to BaseScan means TableScan and any external subclass now inherit them. Net additive. Prior thinking.
| S = TypeVar("S", bound="TableScan", covariant=True) | ||
|
|
||
|
|
||
| class TableScan(BaseScan, ABC): |
There was a problem hiding this comment.
Was a direct ABC; now extends BaseScan. All previously-present fields, methods, and abstract API are preserved (see #3364 (comment)). The only behavioural delta is that previously-abstract methods on TableScan (to_pandas, to_polars) now have default implementations inherited from BaseScan.
| @cached_property | ||
| def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | ||
| return KeyDefaultDict(self._build_partition_projection) | ||
| def _manifest_planner(self) -> ManifestGroupPlanner: |
There was a problem hiding this comment.
Cached so that the planner's own partition_filters cached_property lives for the scan's lifetime — matches the pre-PR caching behaviour on DataScan (where partition_filters was itself a cached_property directly).
| partition_type = spec.partition_type(self.table_metadata.schema()) | ||
| partition_schema = Schema(*partition_type.fields) | ||
| partition_expr = self.partition_filters[spec_id] | ||
| def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]: |
There was a problem hiding this comment.
Public; only call site within PyIceberg is pyiceberg/table/inspect.py. Kept for back-compat — external library users may rely on it. Body now delegates to ManifestGroupPlanner.plan_manifest_entries so the work isn't duplicated with IncrementalAppendScan. (Prior context on whether the underscore-prefixed helpers needed a deprecation cycle — they're gone now and aren't documented as supported.)
| which can be used to read a stream of record batches one by one. | ||
| """ | ||
| import pyarrow as pa | ||
| class IncrementalAppendScan(BaseScan): |
There was a problem hiding this comment.
Mirrors Java's IncrementalAppendScan interface and BaseIncrementalAppendScan implementation. Only the append variant of IncrementalScan — changelog scan is out of scope here.
|
|
||
| def to_pandas(self, **kwargs: Any) -> pd.DataFrame: | ||
| """Read a Pandas DataFrame eagerly from this Iceberg table. | ||
| def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int | None) -> IAS: |
There was a problem hiding this comment.
Maps to Java's fromSnapshotExclusive(long). We don't expose the String ref overload or useBranch — Spark passes raw IDs anyway, and ref support can be added later without breaking anything.
|
|
||
| con = connection or duckdb.connect(database=":memory:") | ||
| con.register(table_name, self.to_arrow()) | ||
| def projection(self) -> Schema: |
There was a problem hiding this comment.
Always uses the table's current schema, unlike TableScan.projection() which uses the snapshot's schema when snapshot_id is set. Matches Java: BaseTable.newIncrementalAppendScan constructs the scan with schema(), which on BaseTable.schema() returns ops.current().schema() — the table's current schema, not snapshot-bound. C++ does the same: TableScanBuilder::ResolveSnapshotSchema falls through to metadata_->Schema() for incremental scans (no snapshot_id on the context). Older-schema rows in range get NULL for new columns — covered by test_incremental_append_scan_schema_evolution_within_range.
| return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) | ||
|
|
||
| return con | ||
| def plan_files(self) -> Iterable[FileScanTask]: |
There was a problem hiding this comment.
Mirrors Java's BaseIncrementalAppendScan.doPlanFiles and appendFilesFromSnapshots — walk ancestors, filter to append snapshots, dedup manifests whose added_snapshot_id is in range, then filter manifest entries by (snapshot_id in range, status == ADDED). Set semantics on the manifest dedup match the Java snippet and rely on ManifestFile.__eq__/__hash__ being defined (which they are on main since #2233).
|
|
||
| def to_polars(self) -> pl.DataFrame: | ||
| """Read a Polars DataFrame from this Iceberg table. | ||
| def _validate_and_resolve_snapshots(self) -> tuple[int, int]: |
There was a problem hiding this comment.
Two semantic notes:
from(exclusive) is validated viais_parent_ancestor_of, notis_ancestor_of— matches Java'sBaseIncrementalScan.fromSnapshotIdExclusive(see the inline comment there about expiry) and C++'sinternal::FromSnapshotIdExclusive. This admits cursors whosefromsnapshot has since been expired (canonical incremental-ingestion pattern); fabricated IDs are still rejected.- Equal
from/toraises (a snapshot is never its own parent ancestor), again matching Java/C++.
| return self.from_snapshot_id_exclusive, to_snapshot_id | ||
|
|
||
|
|
||
| class ManifestGroupPlanner: |
There was a problem hiding this comment.
Motivated by Java's ManifestGroup — both DataScan and IncrementalAppendScan need to plan file scan tasks from a set of manifests with optional filtering, and this is the natural shape for that (prior thinking). All the _build_* helpers and _check_sequence_number are moved from DataScan, not new.
| return result | ||
| executor = ExecutorFactory.get_or_create() | ||
| return executor.map( | ||
| lambda args: _open_manifest(*args), |
There was a problem hiding this comment.
Extracted so both DataScan.scan_plan_helper (kept for back-compat / inspect.py) and plan_files below can share the partition-summary / per-file evaluator pipeline.
| yield from ancestors_of(to_snapshot, table_metadata) | ||
|
|
||
|
|
||
| def ancestors_between_ids( |
There was a problem hiding this comment.
Mirrors Java's SnapshotUtil.ancestorsBetween. Differs from the existing ancestors_between (snapshot-based, inclusive-inclusive) above by taking IDs and being exclusive-inclusive, to match the incremental-scan validation pattern. Raises if to_snapshot_id_inclusive is missing from metadata, mirroring Java.
| yield from ancestors_of(to_snapshot, table_metadata) | ||
|
|
||
|
|
||
| def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int, table_metadata: TableMetadata) -> bool: |
There was a problem hiding this comment.
Mirrors Java's SnapshotUtil.isParentAncestorOf, including the Cannot find snapshot raise on missing snapshot (Java throws one hop down, via ancestorsOf(long, lookup)).
|
|
||
| @pytest.mark.integration | ||
| @pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) | ||
| def test_incremental_append_scan_metrics_pruning(catalog: Catalog) -> None: |
There was a problem hiding this comment.
Filters on a non-partition column (number), so the manifest and partition evaluators degenerate to ALWAYS_TRUE and it's the per-file metrics evaluator (column min/max/null stats) that must do all the pruning. Covers a layer of ManifestGroupPlanner that the existing DataScan integration coverage doesn't exercise end-to-end through a real scan.
| Returns: | ||
| pa.Table: Materialized Arrow Table from the Iceberg table's DataScan | ||
| """ | ||
| def count(self) -> int: |
There was a problem hiding this comment.
(This code is not new, just moved)
| @cached_property | ||
| def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | ||
| return KeyDefaultDict(self._build_partition_projection) | ||
| def _to_arrow_via_file_scan_tasks(scan: BaseScan, tasks: Iterable[FileScanTask]) -> pa.Table: |
There was a problem hiding this comment.
Introducing this helper + (the one below) specialised for FileScanTask. We don't want to have this be the default implementation on BaseScan because it requires FileScanTask specifically and not all table scans will have FileScanTask planned in general (i.e. changelogs)
Closes #2634.
Rationale for this change
Largely a revival of Revival of #2634 (comment). Please see that issue and previous PRs for context and motivation.
References: https://github.com/apache/iceberg (containing Iceberg-Java and Spark, both are relevant to us), and apache/iceberg-cpp#590. Note: I've asked an LLM to drop review comments on this PR linking to relevant places in the references mentioned, to aid reviewing.
Are these changes tested?
Yes, both unit and integration tests can be found in this PR.
Are there any user-facing changes?
Yes, there are removal of private methods but not public changes apart from the new feature. Please see the PR comments for more information.